Fix deferrable Beam Dataflow operators failing with 400 when job ID is missing from stdout#69102
Conversation
… missing from stdout When the Dataflow launcher process runs with WARNING log level (the default), it does not emit the "Created job with id" line that the Beam operator parses to capture the Dataflow job ID. This left dataflow_job_id as None, causing the deferrable trigger to fail with "400 Request must contain a job and project id". Fix by adding a periodic_callback parameter to run_beam_command() that is invoked roughly every 5 seconds while the launcher subprocess is running. The deferrable Beam operators now pass a callback that polls DataflowHook.fetch_job_id_by_name() to resolve the job ID by name. Once the ID is set, the stdout-reading loop exits early so the operator can truly defer, freeing the Airflow worker while the Dataflow job continues running on Google Cloud. Fixes apache#68279
SameerMesiah97
left a comment
There was a problem hiding this comment.
So I just scanned the diff, and 2 things just came to mind:
- Since this polling happens before the operator can defer, does this not increase the amount of time a worker slot is occupied? One of the main benefits of the deferrable path is freeing the worker as early as possible.
- This feels like it's introducing a waiter via a generic callback. If we're going this far, why not introduce a dedicated waiter/helper instead?
I think these higher-level concerns need to be addressed before a line-by-line review.
|
Thanks for the review, @SameerMesiah97 — good questions. 1. Worker slot occupancy The polling does not add any extra occupancy time. The worker was already occupied for the full duration of the launcher subprocess run before my change — If anything, this fix can only reduce occupancy: when the job ID is resolved via the Dataflow API before the launcher finishes printing it to stdout, 2. Generic callback vs. dedicated waiter The The callback pattern is already the established idiom here — Happy to rename it or restructure if you have a specific shape in mind for the dedicated helper. |
MaksYermak
left a comment
There was a problem hiding this comment.
@gingeekrishna thank you for the PR. Have you run the dataflow system tests for this changes?
About solution we do not need additional callback we can use existing one is_dataflow_job_id_exist_callback and extends it with logic which is checking for dataflow_id using dataflow_job_name. Additionally, these changes for callback will work only with additional changes to several other methods in beam hook which needed because otherwise the fd blocks the execution process.
I almost finish the fix for this issue and prepare a PR later on this week
|
Thanks for the detailed review, @MaksYermak! We haven't run the full Dataflow system tests — we only exercised unit tests and the existing beam hook test suite locally. Your point about using the existing Since you're close to finishing your own fix with the correct approach, we're happy to close this PR and defer to yours. We'd rather avoid parallel work that heads in incompatible directions. Just let us know if there's anything from this PR you'd find useful to carry forward, or if you'd prefer we simply close it now. |
@gingeekrishna here is a PR VladaZakharova#325 with changes. This PR is blocked by #66952 because in the current time |
|
Thanks for the context @MaksYermak — that's really helpful to understand the full picture. Makes sense that everything is gated on the provider un-suspension in #66952 first. Given that your approach at VladaZakharova#325 is the correct design and is already waiting behind #66952, Shall I go ahead and close this PR so there's no noise?. Once #66952 lands and the beam provider is active again, your fix can move forward cleanly without any parallel confusion from ours. Let me know if there's anything from this PR you'd like to pull in, or if you'd like a review on your branch once the block is cleared. |
Closes #68279
Problem
When the Dataflow launcher subprocess runs with the default WARNING log level, it does not emit the
"Created job with id: [...]"line that the Beam operator parses to capture the Dataflow job ID. This leavesdataflow_job_id = None.The previous PRs (#67711, #68720) addressed this by adding a fallback after the launcher subprocess finished — but as reviewer @MaksYermak correctly noted, that is not the root cause fix: by the time the launcher exits, the Dataflow job may have already completed, so deferral never gets a chance to free the Airflow worker.
Root Cause Fix
The correct fix is to capture the job ID during the stdout-reading loop, before the launcher finishes, so the operator can truly defer.
Changes
providers/google/.../hooks/dataflow.pyDataflowHook.fetch_job_id_by_name(prefix_name, location, project_id)— looks up a Dataflow job by name prefix via the API, returning its ID.providers/apache/beam/.../hooks/beam.pyimport timeperiodic_callback: Callable[[], None] | None = Noneparameter torun_beam_command(),_start_pipeline(),start_python_pipeline(), andstart_java_pipeline()run_beam_command(): invokeperiodic_callback()roughly every 5 seconds while the subprocess is running (usingtime.monotonic()tracking). After each periodic call, checkis_dataflow_job_id_exist_callback()and exit early if the ID has been resolved — before the subprocess finishes.providers/apache/beam/.../operators/beam.pyBeamDataflowMixin.__get_dataflow_job_id_poll_callback(): returns a closure that callsDataflowHook.fetch_job_id_by_name()and setsself.dataflow_job_idwhen a matching job is found; silently retries on transient errors.BeamRunPythonPipelineOperator.execute_on_dataflow()andBeamRunJavaPipelineOperator.execute_on_dataflow()to create and pass this callback.How this fixes the issue
dataflow_job_idis set,is_dataflow_job_id_exist_callback()returnsTrue, and the stdout-reading loop exits immediately — before the Dataflow job finishes.This path is the same whether or not the launcher emits a job-ID line to stdout. If stdout does emit the line,
process_line_callbacksetsdataflow_job_idand the loop exits on the nextis_dataflow_job_id_exist_callback()check, as before.Tests
periodic_callback=Noneinrun_beam_commandmock assertions (all callers that don't pass a periodic_callback).test_exec_dataflow_runnertests to includeperiodic_callback=mock.ANY.test_exec_dataflow_runner_periodic_callback_fetches_job_idfor bothBeamRunPythonPipelineOperatorandBeamRunJavaPipelineOperator: captures theperiodic_callbackpassed by the operator, calls it directly, and asserts thatdataflow_job_idis set by pollingfetch_job_id_by_name.Checklist
periodic_callbackdefaults toNone; existing callers are unaffectedexecute_on_dataflow)py_compileproviders/apache/beam/newsfragments/68279.bugfix.rst